/*###################################################################
  > File Name: volume_ctl_cleanup.c
  > Author: Vurtune
  > Mail: vurtune@foxmail.com
  > Created Time: Fri 06 Oct 2017 01:31:18 AM PDT
###################################################################*/

#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"

#include "volume_ctl.h"
#include "volume_ctl_internal.h"
#include "cleanup_offline_msg.h"

#define SNAPSHOT_CLEANUP_RANGE 0
#define SNAPSHOT_CLEANUP_CHUNK 1

#undef VOLUME_CLEANUP_DISCARD

/*
 *SNAPSHOT
 */

#if SNAPSHOT_CLEANUP_RANGE

#define CLEANUP_PARALLELS 100

typedef struct {
        int retval;
        int *left;
        const task_t *task;
        mcache_entry_t *cent;
        const nid_t *distnid;
        const nid_t *srcnid;
        const volid_t *distid;
        const volid_t  *srcid;
        uint64_t idx;
        int running;
        uint64_t size;
        ec_t ec;
        time_t ctime;
} cleanup_arg_t;

STATIC void __volume_ctl_snapshot_cleanup_bh_range__(void *_args)
{
        int ret;
        cleanup_arg_t *arg = _args;
        mcache_entry_t *cent = arg->cent;
        const nid_t *distnid = arg->distnid, *srcnid = arg->srcnid;
        const volid_t *distid = arg->distid, *srcid = arg->srcid;
        uint64_t chknum = size2chknum(arg->size), idx = arg->idx;
        volume_proto_t *volume_proto;

        ANALYSIS_BEGIN(0);

        volume_proto = cent->value;
        if (idx >= chknum)
                goto out;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (idx % 1000 == 0 || idx == chknum - 1) {
                DINFO("cleanup chunk "CHKID_FORMAT" [%llu/%llu]\n",
                                CHKID_ARG(srcid), (LLU)idx, (LLU)chknum);
        }

        ret = volume_proto->snapshot_cleanup_chunk(srcnid, srcid,
                        distnid, distid, idx , arg->size);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == ENOKEY) {
                        __volume_ctl_unlock(cent);
                        goto out;
                } else
                        GOTO(err_lock, ret);
        }

        __volume_ctl_unlock(cent);

out:

        ANALYSIS_END(0, IO_WARN, NULL);

        arg->retval = 0;
        arg->running = 0;
        *(arg->left) = *(arg->left) - 1;
        YASSERT(*(arg->left) >= 0);

        DBUG("cleanup "CHKID_FORMAT"[%llu], left %u \n",
                        CHKID_ARG(srcid), (LLU)idx, *(arg->left));

        if (*(arg->left) == 0) {
                schedule_resume(arg->task, 0, NULL);
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        return;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        arg->retval = ret;
        arg->running = 0;
        *(arg->left) = *(arg->left) - 1;
        YASSERT(*(arg->left) >= 0);
        if (*(arg->left) == 0) {
                schedule_resume(arg->task, 0, NULL);
        }

        ANALYSIS_END(0, IO_WARN, NULL);
        return;
}

STATIC void __volume_ctl_snapshot_cleanup_bh_range_check(void *_args)
{
        int i, running = 0;
        cleanup_arg_t *args, *arg;
        time_t ctime = gettime();

        args = _args;
        for (i = 0; i < CLEANUP_PARALLELS; i++) {
                arg = &args[i];

                if (ctime - arg->ctime > gloconf.rpc_timeout * 2) {
                        DINFO("volume "CHKID_FORMAT"[%llu] range %llu running %u, retval %u\n",
                                        CHKID_ARG(arg->srcid), (LLU)arg->idx,
                                        (LLU)arg->idx / CLEANUP_PARALLELS,  arg->running, arg->retval);
                }

                running += arg->running;
        }

        DBUG("left %u running %u\n", *(arg->left), running);
        YASSERT(running);
}

STATIC int __volume_ctl_snapshot_cleanup_bh_range(mcache_entry_t *cent,
                const nid_t *srcnid, const volid_t *srcid,
                const nid_t *distnid, const volid_t *distid,
                uint64_t range, fileinfo_t *fileinfo)
{
        int ret, count;
        cleanup_arg_t args[CLEANUP_PARALLELS], *arg;
        task_t task;
        uint64_t i;

        DBUG("cleanup "CHKID_FORMAT"[%llu, %llu)\n",
                        CHKID_ARG(srcid), (LLU)range * CLEANUP_PARALLELS,
                        (LLU)(range + 1) * CLEANUP_PARALLELS);

        task = schedule_task_get();
        count = CLEANUP_PARALLELS;
        for (i = 0; i < CLEANUP_PARALLELS; i++) {
                arg = &args[i];
                arg->left = &count;
                arg->task = &task;
                arg->cent = cent;
                arg->srcid = srcid;
                arg->srcnid = srcnid;
                arg->distid = distid;
                arg->distnid = distnid;
                arg->idx = range * CLEANUP_PARALLELS + i;
                arg->running = 1;
                arg->size = fileinfo->size;
                arg->ec = fileinfo->ec;
                arg->ctime = gettime();
                schedule_task_new("volume_cleanup_range",
                                __volume_ctl_snapshot_cleanup_bh_range__, arg, -1);
        }

        ret = schedule_yield1("cleanup_range", NULL, (void *)args,
                              __volume_ctl_snapshot_cleanup_bh_range_check, 120);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < CLEANUP_PARALLELS; i++) {
                arg = &args[i];
                YASSERT(arg->running == 0);
                if (arg->retval) {
                        ret = arg->retval;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_cleanup_bh1__(mcache_entry_t *cent,
                const nid_t *srcnid, const volid_t *srcid,
                const nid_t *distnid, const volid_t *distid)
{
        int ret;
        fileinfo_t fileinfo;
        uint64_t ranges, i, chknum;

        ANALYSIS_BEGIN(0);

        DINFO("cleanup "CHKID_FORMAT"\n", CHKID_ARG(srcid));

        if (net_islocal(srcnid)) {
                DBUG("chunk %s %p\n", id2str(&srcid), &srcid);
                ret = stor_ctl_getattr(srcid, &fileinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_getattr(srcnid, srcid, &fileinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        chknum = size2chknum(fileinfo.size, &fileinfo.ec);
        ranges = chknum / CLEANUP_PARALLELS;
        if (chknum % CLEANUP_PARALLELS)
                ranges++;

        for (i = 0; i < ranges; i++) {
                ret = __volume_ctl_snapshot_cleanup_bh_range(cent,
                                srcnid,  srcid, distnid,
                                distid, i, &fileinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

#elif SNAPSHOT_CLEANUP_CHUNK
typedef struct {
        nid_t srcnid;
        volid_t srcid;
        nid_t _distnid;
        volid_t _distid;
        nid_t *distnid;
        volid_t *distid;
        int idx;
        uint64_t size;
        ec_t ec;
} cleanup_arg_t;

STATIC int __volume_ctl_snapshot_cleanup_chunk_bh1(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        cleanup_arg_t *arg = va_arg(ap, cleanup_arg_t *);

        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;
        fileinfo_t fileinfo;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto_snapshot_cleanup_bh_src(volume_proto, name, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        arg->srcnid = chkinfo->diskid[0].id;
        arg->srcid = chkinfo->id;

        ret = volume_proto_snapshot_cleanup_bh_dist(volume_proto, &chkinfo->id,
                        &arg->_distnid, &arg->_distid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        arg->distnid = NULL;
                        arg->distid = NULL;
                } else
                        GOTO(err_lock, ret);
        } else {
                arg->distnid = &arg->_distnid;
                arg->distid = &arg->_distid;
        }

        __volume_ctl_unlock(cent);

        if (net_islocal(&arg->srcnid)) {
                DBUG("chunk %s %p\n", id2str(&arg->srcid), &arg->srcid);
                ret = stor_ctl_getattr(&arg->srcid, &fileinfo);
                if (unlikely(ret))
                        GOTO(err_release, ret);
        } else {
                ret = stor_rpc_getattr(&arg->srcnid, &arg->srcid, &fileinfo);
                if (unlikely(ret))
                        GOTO(err_release, ret);
        }

        arg->size = fileinfo.size;
        arg->ec = fileinfo.ec;

        __volume_ctl_release(cent);
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_cleanup_chunk_bh2(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        cleanup_arg_t *arg = va_arg(ap, cleanup_arg_t *);
        ec_t *ec;

        va_end(ap);

        int ret, deleting;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        deleting = volume_proto->table1.fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_lock, ret);
        }

        ec = &volume_proto->table1.fileinfo.ec;
        ret = volume_proto->snapshot_cleanup_chunk(&arg->srcnid, &arg->srcid,
                        arg->distnid, arg->distid, arg->idx, arg->size, ec);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_snapshot_cleanup_chunk(const volid_t *volid, const char *name)
{
        int ret, i, chknum;
        cleanup_arg_t arg;

        ret = core_request(core_hash(volid), -1, "snapshot_cleanup_chunk_bh1",
                        __volume_ctl_snapshot_cleanup_chunk_bh1, volid, name, &arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chknum = size2chknum(arg.size, &arg.ec);
        for (i = 0; i < chknum; i++) {
                arg.idx = i;
                ret = core_request(core_hash(volid), -1, "snapshot_cleanup_chunk_bh2",
                                __volume_ctl_snapshot_cleanup_chunk_bh2, volid, &arg);
                if (unlikely(ret)) {
                        if (ret == ENOENT || ret == ENOKEY) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

#else //#if SNAPSHOT_CLEANUP_RANGE #elif SNAPSHOT_CLEANUP_CHUNK

STATIC int __volume_ctl_snapshot_cleanup_bh1__(mcache_entry_t *cent,
                const nid_t *srcnid, const volid_t *srcid,
                const nid_t *distnid, const volid_t *distid)
{
        int ret, i, chknum;
        volume_proto_t *volume_proto;
        fileinfo_t fileinfo;

        ANALYSIS_BEGIN(0);

        if (net_islocal(srcnid)) {
                ret = stor_ctl_getattr(srcid, &fileinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_getattr(srcnid, srcid, &fileinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        chknum = size2chknum(fileinfo.size);

        ANALYSIS_BEGIN(1);

        for (i = 0; i < chknum; i++) {
                ret = __volume_ctl_rdlock(cent);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (i % 1000 == 0 || i == chknum - 1) {
                        DINFO("cleanup chunk "CHKID_FORMAT" [%u/%u]\n",
                                        CHKID_ARG(srcid), i, chknum);
                }

                volume_proto = cent->value;
                ret = volume_proto->snapshot_cleanup_chunk(srcnid, srcid,
                                distnid, distid, i, fileinfo.size);
                if (unlikely(ret)) {
                        if (ret == ENOENT || ret == ENOKEY) {
                                __volume_ctl_unlock(cent);
                                continue;
                        } else
                                GOTO(err_lock, ret);
                }

                __volume_ctl_unlock(cent);
        }

        ANALYSIS_END(1, IO_WARN, NULL);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        ANALYSIS_END(1, IO_WARN, NULL);
        return ret;
}

#endif //#if SNAPSHOT_CLEANUP_RANGE #elif SNAPSHOT_CLEANUP_CHUNK

#if !SNAPSHOT_CLEANUP_CHUNK
STATIC int __volume_ctl_snapshot_cleanup_bh1(mcache_entry_t *cent, const char *name)
{
        int ret;
        volume_proto_t *volume_proto;
        nid_t prevnid, *_nid;
        volid_t previd, *_snapid;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_BEGIN(0);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto_snapshot_cleanup_bh_src(volume_proto, name, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        ret = volume_proto_snapshot_cleanup_bh_dist(volume_proto, &chkinfo->id,
                        &prevnid, &previd);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        _nid = NULL;
                        _snapid = NULL;
                } else
                        GOTO(err_lock, ret);
        } else {
                _nid = &prevnid;
                _snapid = &previd;
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        __volume_ctl_unlock(cent);

        ret = __volume_ctl_snapshot_cleanup_bh1__(cent, &chkinfo->diskid[0].id,
                        &chkinfo->id, _nid, _snapid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}
#endif

STATIC int __volume_ctl_snapshot_cleanup_bh2(mcache_entry_t *cent, const char *name)
{
        int ret;
        volume_proto_t *volume_proto;

        ANALYSIS_BEGIN(0);
        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_cleanup_bh2(volume_proto, name);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

STATIC int __volume_ctl_snapshot_cleanup_bh(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);
        va_end(ap);

        int ret, retry = 0;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        char pool[MAX_NAME_LEN];

        DINFO("cleanup snap "CHKID_FORMAT"/%s\n", CHKID_ARG(volid), name);

retry:
        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        strcpy(pool, volume_proto->table1.pool);

        __volume_ctl_unlock(cent);

        ANALYSIS_BEGIN(0);

#if !SNAPSHOT_CLEANUP_CHUNK
        ret = __volume_ctl_snapshot_cleanup_bh1(cent, name);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == ENOKEY) {
                        DINFO(""CHKID_FORMAT"/%s not exist, may be cleanuped, just continue\n",
                                        CHKID_ARG(volid), name);
                } else
                        GOTO(err_release, ret);
        }
#endif

        ret = __volume_ctl_snapshot_cleanup_bh2(cent, name);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        __volume_ctl_release(cent);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (200 * 1000));
                } else
                        GOTO(err_release, ret);
        }

        // 删除rmsnap任务
        ret = rmsnap_bh_remove(pool, volid, name);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        __volume_ctl_release(cent);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (200 * 1000));
                } else
                        GOTO(err_release, ret);
        }

        __volume_ctl_release(cent);

        DINFO("cleanup snap "CHKID_FORMAT"/%s success\n", CHKID_ARG(volid), name);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

STATIC int __volume_ctl_snapshot_cleanup_direct(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const char *name = va_arg(ap, char *);

        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        DINFO("unlink snap "CHKID_FORMAT"/%s\n", CHKID_ARG(volid), name);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_cleanup_direct(volume_proto, name);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_cleanup_direct(const volid_t *volid, const char *name)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_cleanup_direct",
                        __volume_ctl_snapshot_cleanup_direct, volid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_ctl_cleanup_snapshot(const volid_t *volid)
{
        int ret, retry = 0;
        uint64_t last_snap_version;
        char snapname[MAX_NAME_LEN];
        volid_t snapid;
        nid_t snapnid;

        while (1) {
                // TODO del snapshot
                ret = volume_ctl_snapshot_last(volid, &snapnid, &snapid, snapname, &last_snap_version);
                if (unlikely(ret)) {
                        if (ret == ENOENT)
                                break;
                        else
                                GOTO(err_ret, ret);
                }

                DINFO("cleanup snapshot "CHKID_FORMAT"/%s\n", CHKID_ARG(volid), snapname);

                retry = 0;
retry:
                ret = volume_ctl_snapshot_cleanup_direct(volid, snapname);
                if (unlikely(ret)) {
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry, retry, 5, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

#if 0
STATIC int __volume_ctl_snapshot_cleanup(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        va_end(ap);

        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);
        ret = volume_proto->snapshot_cleanup(volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_snapshot_cleanup(const volid_t *volid)
{
        int ret;

        ret = core_request(core_hash(volid), -1, "snapshot_cleanup", __volume_ctl_snapshot_cleanup,
                        volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
#endif

/**
 * @todo 如果是最后一个快照，可以优化：直接删除
 *
 * @param volid
 * @param name
 * @return
 */
int volume_ctl_snapshot_cleanup_bh(const volid_t *volid, const char *name)
{
        int ret;

#if SNAPSHOT_CLEANUP_CHUNK
        // TODO 对每一个chunk，保存要删除快照上的数据到其父节点
        ret = __volume_ctl_snapshot_cleanup_chunk(volid, name);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == ENOKEY) {
                        DINFO(""CHKID_FORMAT"/%s not exist, may be cleanuped, just continue\n",
                                        CHKID_ARG(volid), name);
                } else
                        GOTO(err_ret, ret);
        }
#endif

        // CHECKLIST:
        // - 更新快照树的引用结构
        // - 删除快照
        // - 删除无用的auto/root snap
        // - 清除任务
        ret = core_request(core_hash(volid), -1, "snapshot_cleanup_bh",
                        __volume_ctl_snapshot_cleanup_bh, volid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/*
 *VOLUME
 */

#ifdef VOLUME_CLEANUP_DISCARD
STATIC int __volume_ctl_cleanup_self_chunk_bh1(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        table1_t *table1;
        const volid_t *volid = va_arg(ap, volid_t *);
        int *chknum = va_arg(ap, int *);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        table1 = &volume_proto->table1;
        *chknum = size2chknum(table1->fileinfo.size);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);
        return 0;
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_cleanup_self_chunk_bh2(va_list ap)
{
        int ret;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        chkid_t chkid;
        const volid_t *volid = va_arg(ap, volid_t *);
        int i = va_arg(ap, int);

        va_end(ap);

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        fid2cid(&chkid, volid, i);
        ret = volume_proto->cleanup_bh1(volume_proto, &chkid);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);
        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}
#endif

STATIC int __volume_ctl_cleanup_self_bh2(mcache_entry_t *cent)
{
        int ret;
        volume_proto_t *volume_proto;

        ANALYSIS_BEGIN(0);
        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        // TODO 已释放volume_proto, 但没有更新cent->value, 成了wild pointer
        // 所以，此时不执行volume_proto_destroy
        DWARN("cleanup_bh2 volume %p\n", volume_proto);
        ret = volume_proto->cleanup_bh2(volume_proto);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        // cleanup的最后一步，准备释放volume_proto
        mcache_drop_nolock(cent);

        __volume_ctl_unlock(cent);
        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_ret:
        return ret;
}

STATIC int __volume_ctl_cleanup_self__(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        va_end(ap);

        int ret, retry = 0;
        mcache_entry_t *cent;

retry:
        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("cleanup file "CHKID_FORMAT"\n", CHKID_ARG(volid));

#ifdef VOLUME_CLEANUP_DISCARD

        ret = __volume_ctl_cleanup_self_bh2(cent);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        __volume_ctl_release(cent);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (100 * 1000));
                } else
                        GOTO(err_release, ret);
        }

#else
        volume_proto_t *volume_proto;

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        ret = volume_proto->suicide(volume_proto);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        __volume_ctl_unlock(cent);
                        __volume_ctl_release(cent);
                        USLEEP_RETRY(err_ret, ret, retry, retry, 3, (100 * 1000));
                } else
                        GOTO(err_lock, ret);
        }

        volume_proto->ltime = 0;

        mcache_drop_nolock(cent);
        __volume_ctl_unlock(cent);

#endif
        __volume_ctl_release(cent);
        return 0;

#ifdef VOLUME_CLEANUP_DISCARD
#else
err_lock:
        __volume_ctl_unlock(cent);
#endif

err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

#ifdef VOLUME_CLEANUP_DISCARD
#else

typedef struct {
        chkid_t id;
        int finish;
} iterate_arg_t;

static int __volume_cleanup_broadcast(void *node, void *_arg)
{
        int ret = 0;

        nid_t *nid = node;
        iterate_arg_t *arg = _arg;
        chkid_t *id = &arg->id;


        ret = offline_msg_create(nid, id);
        if (unlikely(ret)) {
                arg->finish = 0;
                DWARN("vol "CHKID_FORMAT" cleanup broadcast (%d) ret:%d\n", CHKID_ARG(id), nid->id, ret);
        }

        DINFO("vol "CHKID_FORMAT" cleanup broadcast (%d) succ\n", CHKID_ARG(id), nid->id);
        return 0;
}
#endif

static int __check_bh_task(volume_proto_t *volume_proto, const char *root, int *exist) {
        int ret;
        const volid_t *volid = &volume_proto->chkid;
        char path[MAX_NAME_LEN];
        fileid_t fileid;

        *exist = 0;

        snprintf(path, MAX_NAME_LEN, "%s/"CHKID_FORMAT, root, CHKID_ARG(volid));

        ret = stor_lookup1(volume_proto->table1.pool, path, &fileid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        *exist = 0;
                } else {
                        GOTO(err_ret, ret);
                }
        } else {
                *exist = 1;
        }

        DINFO("check %s%s exist %d\n", volume_proto->table1.pool, path, *exist);

        return 0;
err_ret:
        return ret;
}

typedef struct {
        char *pool;
        int *exist;
} arg_t;

static int __snap_iterator(void *_arg, void *_chkid, void *_snap)
{
        int ret, len;
        char name[MAX_NAME_LEN];
        snap_t *snap = _snap;
        fileid_t rootid, *fileid = _chkid;

        arg_t *arg = _arg;
        char *pool = arg->pool;
        int *exist = arg->exist;

        len = strlen(LICH_SYSTEM_ATTR_UNLINK);

        if (strlen(snap->key) >= len && 0 == memcmp(snap->key, LICH_SYSTEM_ATTR_UNLINK, len)) {

                ret = rmsnap_bh_root(pool, &rootid);
                if (ret) {
                        GOTO(err_ret, ret);
                }

                snprintf(name, MAX_NAME_LEN, CHKID_FORMAT":%s", CHKID_ARG(fileid), snap->key);

                DINFO("unlink snap %s\n", name);

                ret = stor_rmvol(&rootid, name, 0);
                if (ret) {
                        if (ret != ENOENT) {
                                DWARN("cleanup snap %s unlink-> %s not found\n", snap->key, name);
                                GOTO(err_ret, ret);
                        }
                } else {
                        *exist = 1;
                }
        }

        DWARN("cleanup snap iterator %s "CHKID_FORMAT"\n", snap->key, CHKID_ARG(&snap->chkinfo->id));

        return 0;
err_ret:
        return ret;
}

static int __volume_ctl_cleanup_deps(va_list ap) {
        const volid_t *volid = va_arg(ap, volid_t *);
        va_end(ap);

        int ret, exist1, exist2, exist3 = 0;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;
        fileinfo_t *fileinfo;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_wrlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;

        //set deleting sttr, need persistent?
        {
                setattr_t setattr;
                memset(&setattr, 0x0, sizeof(setattr));
                setattr.deleting.set_it = 1;
                setattr.deleting.val = 1;
                ret = volume_proto->setattr(volume_proto, NULL, &setattr);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        fileinfo = &volume_proto->table1.fileinfo;

        // snapshot也会进入直接删除模式，即/system/unlink
        // snapshot does not need to check bh_task
        if (fileinfo->attr & __FILE_ATTR_SNAPSHOT__) {
                goto out;
        }

        // 提前终止rollback, flat等任务(Job Control)
        ret = __check_bh_task(volume_proto, ROLLBACK_ROOT, &exist1);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (exist1) {
                bh_task_remove(volume_proto->table1.pool, ROLLBACK_ROOT, volid);
        }

        ret = __check_bh_task(volume_proto, FLAT_ROOT, &exist2);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (exist2) {
                bh_task_remove(volume_proto->table1.pool, FLAT_ROOT, volid);
        }

        arg_t arg = {
                .pool = volume_proto->table1.pool,
                .exist = &exist3,
        };

        // 优化rmsnap，从标记删除模式切换到直接删除模式，并确保其完成
        ret = volume_proto->table1.snapshot_iterator2(&volume_proto->table1, __snap_iterator, &arg);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

out:
        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_cleanup_deps(const volid_t *volid) {
        int ret;

        ret = core_request(core_hash(volid), -1, "cleanup_deps", __volume_ctl_cleanup_deps, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#ifdef VOLUME_CLEANUP_DISCARD
STATIC int __volume_ctl_cleanup_self_chunk(const volid_t *volid)
{
        int ret;

        int i, chknum;
        ret = core_request(core_hash(volid), -1, "volume_cleanup_chunk_bh1",
                        __volume_ctl_cleanup_self_chunk_bh1, volid, &chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < chknum; i++) {
                ret = core_request(core_hash(volid), -1, "volume_cleanup_chunk_bh2",
                                __volume_ctl_cleanup_self_chunk_bh2, volid, i);
                if (unlikely(ret)) {
                        if (ret == ENOENT || ret == ENOKEY) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}
#endif

STATIC int __volume_ctl_cleanup_self(const volid_t *volid)
{
        int ret;

#ifdef VOLUME_CLEANUP_DISCARD
        ret = __volume_ctl_cleanup_self_chunk(volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        // 然后自杀
        ret = core_request(core_hash(volid), -1, "cleanup_self",
                        __volume_ctl_cleanup_self__, volid);
        if (unlikely(ret)){
                GOTO(err_ret, ret);
        }
        return 0;
err_ret:
        return ret;
}

int volume_ctl_cleanup_broadcast(const volid_t *volid)
{
        int ret;

        YASSERT(volid->type == __VOLUME_CHUNK__);

        iterate_arg_t arg = {
                .id = *volid,
                .finish = 1,
        };

        ret = cluster_listnode_iterator1(__volume_cleanup_broadcast, (void *)&arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (unlikely(arg.finish == 0)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

/**
 * 依赖于卷控制器的事情：
 * - connection
 * - snapshot
 * - task (create/rm snapshot, rollback, flat)
 *
 * 在最后一步，调用mcache_drop_nolock
 *
 * @param volid
 * @return
 */
int volume_ctl_cleanup(const volid_t *volid)
{
        int ret;

        // called by thread
        // TODO 确保所有依赖于卷控制器的任务完成, 包括（rmsnap, rollback, flat)
        ret = volume_ctl_cleanup_deps(volid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // 强制删除就不需要考虑一般删除快照过程中的快照树维护问题，而是可以直接删除每个快照的所有数据
        // TODO 存在快照的情况下，不允许删除卷，可以增加强制删除的选项
        ret = __volume_ctl_cleanup_snapshot(volid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        //自杀的过程，上面的工作确认完成后就可以自杀了，以确保任何依赖于卷控制器的任务都被终止掉
        //TODO 自杀过程中存在离线副本（其实可以跳过 但是本过程的目的时卷控制器不能工作 节点内部清理也会回收元数据）
        ret = __volume_ctl_cleanup_self(volid);
        if (unlikely(ret)) {
                // go on
                DWARN("volume ctl cleanup self fail, ret %d\n", ret);
                //GOTO(err_ret, ret);
        }

        //etcd上通知当前集群内所有的节点，包括离线节点，节点扫描到etcd的消息后会自动启动清理过程，
        //清理完成自己处理etcd上的消息
        ret = volume_ctl_cleanup_broadcast(volid);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        if (ret == EREMCHG) {
                if (!__volume_ctl_exist(volid)) {
                        DWARN(""CHKID_FORMAT" already removed\n", CHKID_ARG(volid));

                        ret = volume_ctl_cleanup_broadcast(volid);
                        if (unlikely(ret)) {
                                return ret;
                        } else {
                                return 0;
                        }
                }
        }

        return ret;
}

STATIC int __volume_ctl_chunk_cleanup(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        const chkid_t *chkid = va_arg(ap, chkid_t *);
        const nid_t *nid = va_arg(ap, nid_t *);
        uint64_t meta_version = va_arg(ap, uint64_t);

        va_end(ap);

        int ret, deleting;
        mcache_entry_t *cent;
        volume_proto_t *volume_proto;

        ret = __volume_ctl_get(&cent, volid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_ctl_rdlock(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_proto = cent->value;
        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        deleting = volume_proto->table1.fileinfo.attr & __FILE_ATTR_DELETE__;
        if (unlikely(deleting)) {
                ret = ECANCELED;
                GOTO(err_lock, ret);
        }

        ret = volume_proto->chunk_cleanup(volume_proto, chkid, nid, meta_version);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        __volume_ctl_unlock(cent);
        __volume_ctl_release(cent);

        return 0;
err_lock:
        __volume_ctl_unlock(cent);
err_release:
        __volume_ctl_release(cent);
err_ret:
        return ret;
}

int volume_ctl_chunk_cleanup(const volid_t *volid, const chkid_t *chkid,
                const nid_t *nid, uint64_t meta_version)
{
        int ret;

        YASSERT(volid->type == __VOLUME_CHUNK__);

        ret = core_request(core_hash(volid), -1, "chunk_cleanup", __volume_ctl_chunk_cleanup,
                        volid, chkid, nid, meta_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
